Alfie
12/31/2020, 1:35 AMScott Moreland
12/31/2020, 5:08 PMws = instantiate_websocket_feed() # persistent connection run on own thread
# this flow gets run on a schedule every few seconds
with Flow("execute-trades", schedule) as flow:
latest_prices = get_prices(ws)
execute_trades(latest_prices)
Is there a better way to structure this?Adam Roderick
12/31/2020, 6:53 PMtmp3v36vta4
after registration with cloud. This is cluttering up our repository. Is there a setting to avoid this?Brett Naul
12/31/2020, 7:42 PM[11 September 2020 546pm] Flow run is cancellinganyone know of a way to clear all of these out so they don't count against our concurrency limit?
bral
01/02/2021, 7:54 AMEquipe AI HOC
01/02/2021, 11:28 AMAdam Roderick
01/02/2021, 1:49 PMflow.register()
initializes a Client object with no arguments https://github.com/PrefectHQ/prefect/blob/4d8337f75fe9bbb3024faa3b74a8f7debbb596d0/src/prefect/core/flow.py#L1663-L1665. From what I can tell, this will use the api_token value from config.cloud.auth_token (https://docs.prefect.io/api/latest/client/client.html). What should we do in a build server scenario? Will Client()
read from an environment variable if one is set?Marwan Sarieddine
01/02/2021, 5:15 PMmanual_only
triggers for a mapped task, is there a way to approve all child tasks at once, or do I have to click through them manually - this gets unwieldy quite quickAdam Roderick
01/02/2021, 5:33 PMAdam Roderick
01/02/2021, 7:33 PMAdam Roderick
01/02/2021, 11:24 PMwiretrack
01/03/2021, 12:05 AMflow.run(executor=Dask
).
I did however started a Dask Cluster locally, and set the environment variables, as per the docs, but my tasks are still syncronous, and not running in parallel, and I can’t see anything in the dask workers logs. is there anything I should be doing to enjoy the speed and benefits of the parallel and distributed computation?Adam Roderick
01/03/2021, 1:29 AMError while deploying flow: AttributeError("'str' object has no attribute 'get'")
Equipe AI HOC
01/03/2021, 5:16 PMVitaly Shulgin
01/03/2021, 5:33 PMcustom functions aren't supported for `Schedule.adjustments`.
wiretrack
01/04/2021, 12:46 AMserver
? I’ve been getting a long traceback while running the server locally. It seems that a flow_run
is stuck cached somewhere, and some process is still firing a log. I think this happened after I cancelled and deleted a few flows and flow runs. I already uninstalled the server and installed it again, but no successSagun Garg
01/04/2021, 3:55 AMYannick
01/04/2021, 10:08 AMcloudpickle
so that Dask workers can pass the data. Does that mean that data passing between the tasks is in the hands of Dask completely? Meaning that the data is passed over the network in case Dask schedules the tasks to not be on the same machine in the cluster?
• About large data from the docs: "Don't worry about passing large data objects between tasks. As long as it fits in memory, Prefect can handle it with no special settings." What exactly should fit in memory here, the sum of all output data in the flow or is there some sort of eviction going on? Example: when building a flow like: A --> B and A --> C, and B --> D, should the output from A + output from B fit completely in memory? Secondly, from the docs: "(...) If it doesn't, there are ways to distribute the operation across a cluster.", how would I go about doing such a thing?
• For Input Caching, is there any way to configure how this works as it states: "Input caching is an automatic caching." since I would like additional control over input caching.
Many, many thanks! 🙏as
01/04/2021, 1:02 PMtask(get_data,
result=LocalResult(serializer=JSONSerializer(), validators=lambda : False),
target=join(p.data_path, "data.json"),
checkpoint=True,
)
If a target file already exist here, is the task run anyway because the validator returns False?
What I want to achieve is to use a kind of "force_run" variable in a result validator (eg. defined in a user config) that forces to evaluate a task, even if there is already a result at the target location.Equipe AI HOC
01/04/2021, 1:06 PMcache_for
, cache_validator
, cache_key
. For what I understood, this will not cache between flow runs (different process) unless you configure target
, result
and checkpoint
. Is that correct? When I configure these last 3 "persistance" options, it caches between flows, but they don't seem to work integrated with cache_validator
(and any of the first 3 "cache" options for that matter). Is that correct? Are these independent mechanisms? What I want to achieve, put simply, is to cache between flow runs/processes and have a function (cache_validator) to check if the cache/file is still valid.Albert Franzi
01/04/2021, 2:23 PMregister
Flows with CI/CD tools?
Do you have a side docker container which pulls master branch and register new flows using the idempotency_key=flow.serialized_hash(),
approach? (similar as Airflow)? Or any advice / better way of doing it?Matthew Blau
01/04/2021, 2:24 PMwiretrack
01/04/2021, 2:37 PMCharles Lariviere
01/04/2021, 6:16 PMprefect agent ecs start
); but for Prefect to be able to run flows on a schedule, it requires an agent to be always running (since Prefect Cloud doesn’t spin up agents). How does one actually do this without spending days experimenting?Arash Roshani
01/04/2021, 6:37 PMwith
statement definition of a flow? Does anyone have an example?Marc Lipoff
01/04/2021, 9:18 PMjack
01/04/2021, 10:24 PM{'type': ['Unsupported value: UniversalRun']}
{'_schema': 'Invalid data type: None'}
Not really able to run any newly built flows, so massive impact to my work. Did anyone have this issue and able to resolve it?Aakash Indurkhya
01/04/2021, 10:45 PMAakash Indurkhya
01/04/2021, 10:58 PMAakash Indurkhya
01/04/2021, 10:58 PM