Yanghui Ou
10/19/2020, 11:06 PMfrom prefect.engine.results import LocalResult
from prefect import task, Flow
@task(target="add_target.txt", checkpoint=True, result=LocalResult(dir="."))
def add(x, y=1):
ret = x + y
return ret
@task
def print_value( x ):
print( x )
with Flow("my handled flow!", result=LocalResult()) as flow:
first_result = add(1, y=2)
second_result = add(x=first_result, y=100)
print_value( second_result )
flow.run()
The add
task only runs once and the final result is always 3. I read the code about LocalResult
a bit and it seems to me that LocalResult.exists
simply checks whether the target file exists. If so it will cause the task to enter cached state. So how can I get this example to work?
Another issue is that even if I just have one add
task in the flow, after modifying the source code to change its input, it still enters cached state. How can I make the result handler to check the task inputs as well?Chirag
10/20/2020, 6:44 AMLewis Bails
10/20/2020, 7:29 AMFailed: At least one upstream state has an unmappable result.
It first came about after I included a switch statement that chooses between a mapped task, or a normal task that returns an iterable. For example:
with case(use_map, True):
results = some_task.map(inputs) # some_task returns mapped results
with case(use_map, False):
results = manual_mapping_task(inputs) # manual_mapping_task returns python iterable in its results
downstream_task(results) # "Failed: At least one upstream state has an unmappable result"
Is this a valid use of "results" in the downstream task? Is it possible this is where my error is coming from?Newskooler
10/20/2020, 10:34 AMLukas N.
10/20/2020, 1:36 PMscheduled_start_time
here. Easy enough for me to start contributing 🙂 Are you interested in this addition or was there a reason behind not allowing this feature I'm not seeing?Adrien Boutreau
10/20/2020, 2:22 PMManuel Mourato
10/20/2020, 3:25 PM@task
my_inner_task():
# do something
@task
def my_task_one():
my_inner_task()
Ralph Willgoss
10/20/2020, 3:43 PMAdrien Boutreau
10/20/2020, 4:13 PMmogui mogui
10/20/2020, 4:16 PMKubernetes Error: rpc error: code = Unknown desc = Error: image XXXXXX not found
So I've tried to tweak the job spec using KubernetesJobEnvironment(_job_spec_file_="job_spec.yaml")
adding an imagePullSecrets
to the Job template
But the custom yaml that i defined seems not to be used when the job is spawned by the agent,
what am I doing wrong?
Here is how i register the flow:
flow.environment = KubernetesJobEnvironment(job_spec_file="job_spec.yaml")
flow.storage = Docker(
registry_url="REGISTRYURL",
python_dependencies=['psycopg2-binary', 'pymssql'],
)
flow.register(args.register)
Jesper van Dijke
10/20/2020, 8:22 PMJesper van Dijke
10/20/2020, 8:22 PMJesper van Dijke
10/20/2020, 8:38 PMnew user: prefect , added to sudo and docker, nothing special regarding environment.
docker was already installed, I did add docker-compose
stopped my running postgres service, so it doesnt have any port conflicts, however would love to use this one.
---> This creates a .local in my prefect home, navigate to /bin and /lib , took a closer look, runpip install prefect
---> creates a .prefect with a .toml file with setting : backend = "server" , so far so good I guess. Now comes the next step in the documentation, runprefect backend server
---> Starts popping warning regarding postgresprefect server start
WARNING: The PREFECT_SERVER_DB_CMD variable is not set. Defaulting to a blank string.
WARNING: The DB_CONNECTION_URL variable is not set. Defaulting to a blank string.
WARNING: The POSTGRES_DB variable is not set. Defaulting to a blank string.
WARNING: The POSTGRES_PASSWORD variable is not set. Defaulting to a blank string.
WARNING: The POSTGRES_USER variable is not set. Defaulting to a blank string.
Pulling postgres ... done
Pulling hasura ... done
Pulling graphql ... done
Pulling apollo ... done
Pulling towel ... done
Pulling ui ... done
WARNING: The PREFECT_SERVER_DB_CMD variable is not set. Defaulting to a blank string.
WARNING: The DB_CONNECTION_URL variable is not set. Defaulting to a blank string.
WARNING: The POSTGRES_DB variable is not set. Defaulting to a blank string.
WARNING: The POSTGRES_PASSWORD variable is not set. Defaulting to a blank string.
WARNING: The POSTGRES_USER variable is not set. Defaulting to a blank string.
Creating network "prefect-server" with the default driver
Creating tmp_postgres_1 ... done
Creating tmp_hasura_1 ... done
Creating tmp_graphql_1 ... done
Creating tmp_towel_1 ... done
Creating tmp_apollo_1 ... done
Creating tmp_ui_1 ... done
Attaching to tmp_postgres_1, tmp_hasura_1, tmp_graphql_1, tmp_towel_1, tmp_apollo_1, tmp_ui_1
graphql_1 | bash: -c: line 0: syntax error near unexpected token `&&'
graphql_1 | bash: -c: line 0: ` && python src/prefect_server/services/graphql/server.py'
hasura_1 | {"type":"pg-client","timestamp":"2020-10-20T20:15:35.282+0000","level":"warn","detail":{"message":"postgres connection failed, retrying(0)."}}
hasura_1 | {"type":"pg-client","timestamp":"2020-10-20T20:15:35.282+0000","level":"warn","detail":{"message":"postgres connection failed, retrying(1)."}}
hasura_1 | {"type":"startup","timestamp":"2020-10-20T20:15:35.282+0000","level":"error","detail":{"kind":"db_migrate","info":{"internal":"could not connect to server: No such file or directory\n\tIs the server running locally and accepting\n\tconnections on Unix domain socket \"/var/run/postgresql/.s.PGSQL.5432\"?\n","path":"$","error":"connection error","code":"postgres-error"}}}
postgres_1 | Error: Database is uninitialized and superuser password is not specified.
postgres_1 | You must specify POSTGRES_PASSWORD to a non-empty value for the
postgres_1 | superuser. For example, "-e POSTGRES_PASSWORD=password" on "docker run".
postgres_1 |
postgres_1 | You may also use "POSTGRES_HOST_AUTH_METHOD=trust" to allow all
postgres_1 | connections without a password. This is *not* recommended.
postgres_1 |
postgres_1 | See PostgreSQL documentation about "trust":
postgres_1 | <https://www.postgresql.org/docs/current/auth-trust.html>
So where to start from here, adjusting docker compose file or other way to configure the local env?Darshan
10/20/2020, 10:48 PMJesper van Dijke
10/20/2020, 11:19 PMWARNING: The PREFECT_SERVER_DB_CMD variable is not set. Defaulting to a blank string.
WARNING: The DB_CONNECTION_URL variable is not set. Defaulting to a blank string.
WARNING: The POSTGRES_DB variable is not set. Defaulting to a blank string.
WARNING: The POSTGRES_PASSWORD variable is not set. Defaulting to a blank string.
WARNING: The POSTGRES_USER variable is not set. Defaulting to a blank string.
has to do with env vs export (shell vs bash), I've added added a .env
file with the postgres var information and that actually worked when running docker-compose up
searched it and it has to do with docker-compose. Not sure if its a bug in server.py
Docker docker-compose environment vars
Maybe someone is able to verify?Aaron Y
10/21/2020, 1:56 AMdata
is a list of records/dicts, and I want to process them into a csv.
with Flow('some flow') as flow:
data = readData(source)
transformedData = transformRecordsOneWay.map(data)
transformedData = transformRecordsSecondWay.map(transformedData)
transformedData = transformRecordsThirdWay.map(transformedData)
Marwan Sarieddine
10/21/2020, 1:58 AMArsenii
10/21/2020, 8:54 AMtask_run_name
was added to the @task
decorator and it's beautiful -- however it prevents the flow from running locally for obvious reasons
This makes local debugging (before deployment to Cloud) a pain since you'll have to remove and re-add this configuration all he time. Does anyone know a way to make Prefect just... ignore this parameter when the flow is run e.g. with flow.run()
?
Thanks!Alberto de Santos
10/21/2020, 9:25 AMwith case
possibilities and have the following question:
• Using with case
only worked under the with Flow(...) as flow:
statement. When using with case
within a Task
, it didn’t work, does it make sense?Alberto de Santos
10/21/2020, 9:25 AMNarasimhan Ramaswamy
10/21/2020, 12:27 PMUnexpected error: TypeError('Could not serialize object of type Success.\nTraceback (most recent call last):\n File "/usr/local/lib/python3.8/dist-packages/distributed/protocol/pickle.py", line 49, in dumps\n result = pickle.dumps(x, **dump_kwargs)\nTypeError: cannot pickle \'SSLContext\' object\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File "/usr/local/lib/python3.8/dist-packages/distributed/protocol/serialize.py", line 258, in serialize\n header, frames = dumps(x, context=context) if wants_context else dumps(x)\n File "/usr/local/lib/python3.8/dist-packages/distributed/protocol/serialize.py", line 61, in pickle_dumps\n frames[0] = pickle.dumps(\n File "/usr/local/lib/python3.8/dist-packages/distributed/protocol/pickle.py", line 60, in dumps\n result = cloudpickle.dumps(x, **dump_kwargs)\n File "/usr/local/lib/python3.8/dist-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps\n cp.dump(obj)\n File "/usr/local/lib/python3.8/dist-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump\n return Pickler.dump(self, obj)\nTypeError: cannot pickle \'SSLContext\' object\n')
Cant we serialize output of a task and use daskexecutor to run parallel mapped tasks?Paul
10/21/2020, 12:48 PMRalph Willgoss
10/21/2020, 1:30 PMNewskooler
10/21/2020, 3:13 PMopen_db_connection
(run only once during a flow)
• save_to_db
run multiple times during one flow
• close_db_connection
- run once at the end of the flow
Sometimes when I save_to_db
the connection has dropped out (for various reasons). Would a clean solution be to set the current save_to_db
task as RETRY
and then set one of the tasks it depends on (the open_db_connection
task) as RETRY too.
This way it will (or so I expect) to open a db connection first and then retry the failed task again.
🤔 is my thinking correct and is that possible or it’s a messy solution?
Thanks : )Jonas Bernhard
10/21/2020, 3:43 PM"<http://cluster-autoscaler.kubernetes.io/safe-to-evict|cluster-autoscaler.kubernetes.io/safe-to-evict>": "false"
currently the way to go?
Other than that, we currently use the Helm charts from https://github.com/PrefectHQ/server/pull/57 which work pretty great so far. Is there a way to assist to push that forward since "official" Helm charts would be pretty awesome (however I don't have a lot of experience with Helm).Jonas Bernhard
10/21/2020, 3:47 PM--memory-limit 4GB
in the dask-worker
command but also set a memory limit of memory: 4G
in the Kubernetes resources
. Are both required for correct functionality?tom
10/21/2020, 4:26 PMMitchell Bregman
10/21/2020, 4:32 PMVincent
10/21/2020, 7:54 PMBilly McMonagle
10/21/2020, 9:37 PM